package com.example.socket.filter.session;

import com.example.socket.core.Message;
import com.example.socket.core.Session;
import com.example.socket.core.SessionContext;
import com.example.socket.core.SessionKeys;
import com.example.socket.utils.IpUtils;
import io.netty.channel.Channel;
import io.netty.util.Attribute;

import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author frank
 * NETTY 连接SESSION
 */
public class NettySession implements Session, TransientSession {

    private long id;
    private Channel channel;
    private SocketAddress remoteAddress;
    private SessionContext context;
    private boolean destroyed;
    /** 是否短链接 */
    private boolean transitory;

    /** 延迟推送内容 */
    private Deque<Message> delayeds = new LinkedBlockingDeque<>();
    /** 接收的消息历史 */
    private Deque<Message> historys = new LinkedBlockingDeque<>();

    private AtomicInteger counter = new AtomicInteger(0);

    private AtomicInteger sn = new AtomicInteger();

    // ------------

    /** 是否短链接 */
    @Override
    public void setTransitory(boolean transitory) {
        this.transitory = transitory;
    }

    /** 是否短链接 */
    @Override
    public boolean isTransitory() {
        return transitory;
    }

    /**
     * 加入延迟消息列表
     * @param message
     */
    @Override
    public int addDelayed(Message message) {
        // 添加延迟
        delayeds.add(message);
        return delayeds.size();
    }

    /**
     * 获取并清除延迟消息列表
     */
    @Override
    public synchronized List<Message> fetchDelayed() {
        List<Message> list = new ArrayList<>(delayeds.size());
        while (!delayeds.isEmpty()) {
            Message e = delayeds.poll();
            list.add(e);
        }
        return list;
    }

    // ------------

    /**
     * 添加消息历史
     * @param message
     * @param capability
     */
    @Override
    public void addHistory(Message message, int capability) {
        historys.addLast(message);
        while (historys.size() > capability) {
            historys.removeFirst();
        }
    }

    /**
     * 获取消息历史
     * @param command
     * @param sn
     * @return
     */
    @Override
    public Message getHistory(int sn) {
        while (!historys.isEmpty()) {
            Message e = historys.poll();
            if (e.getSn() == sn) {
                return e;
            } else if (e.getSn() > sn) {
                break;
            }
        }
        return null;
    }

    /**
     * 清除历史记录
     */
    @Override
    public void clearHistory() {
        historys.clear();
    }

    // ------------

    /**
     * 绑定SESSION上下文
     * @param context
     */
    @Override
    public SessionContext bindContext(SessionContext context) {
        SessionContext old = this.context;
        this.context = context;

        // 设置管理后台标识
        Attribute<Boolean> attr = channel.attr(MANAGEMENT_KEY);
        Boolean managemant = attr.get();
        if (managemant == null || !managemant.booleanValue()) {
            managemant = false;
            return old;
        }
        String ip = IpUtils.getIp(this);
        SessionKeys.ATT_MANAGEMENT.setValue(this, ip);
        return old;
    }

    /**
     * 关闭当前通信会话
     */
    @Override
    public Future<?> close() {
        if (channel == null) {
            return null;
        }
        return channel.close();
    }

    /**
     * 向当前通信会话推送信息
     * @param msg
     */
    @Override
    public Future<?> write(Object msg) {
        if (channel == null) {
            return null;
        }
        Message message = (Message) msg;
        message.setSession(this.id);
        counter.getAndIncrement();
        return channel.writeAndFlush(message);
    }

    /**
     * 销毁当前通信会话
     */
    @Override
    public void destory() {
        // this.channel = null;
        // this.context = null;
        this.destroyed = true;
        this.delayeds.clear();
        this.historys.clear();

        // 移除SESSION
        Attribute<Session> attr = channel.attr(SESSION_KEY);
        attr.set(null);
    }

    // ------------

    @Override
    public long getId() {
        return id;
    }

    @Override
    public SessionContext getContext() {
        return context;
    }

    @Override
    public boolean isConnected() {
        if (destroyed) {
            return false;
        }
        if (channel == null) {
            return false;
        }
        return channel.isActive();
    }

    @Override
    public boolean isDestroyed() {
        return destroyed;
    }

    @Override
    public SocketAddress getRemoteAddress() {
        return remoteAddress;
    }

    /**
     * 分离 SESSION
     */
    @Override
    public void detach(Channel channel) {
        Attribute<Session> attr = channel.attr(SESSION_KEY);
        attr.set(null);
    }

    /**
     * 附加 SESSION
     */
    @Override
    public void attach(Channel channel, boolean first) {
        this.channel = channel;
        this.remoteAddress = channel.remoteAddress();
        Attribute<Session> attr = channel.attr(SESSION_KEY);
        attr.set(this);
    }

    // ---------

    /**
     * 构造函数
     * @param id SESSION ID
     * @param channel 连接
     */
    NettySession(long id, Channel channel) {
        this.id = id;
        attach(channel, true);
    }

    /**
     * 获取当前连接SESSION
     */
    public static NettySession client(long sessionId, Channel channel) {
        NettySession session = new NettySession(sessionId, channel);
        session.context = new SessionContext(UUID.randomUUID().toString());
        String ip = IpUtils.getIp(channel);
        SessionKeys.ATT_MANAGEMENT.setValue(session, ip);
        return session;
    }

    public AtomicInteger getCounter() {
        return counter;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        NettySession that = (NettySession) o;
        return id == that.id;
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }

    @Override
    public long getLastTime() {
        if (channel == null) {
            return 0;
        }
        Attribute<Boolean> attr = channel.attr(MANAGEMENT_KEY);
        Boolean mg = attr.get();
        if (mg != null && mg.booleanValue()) {
            return System.currentTimeMillis();
        }
        Attribute<Long> attrTime = channel.attr(ACCESS_TIME);
        if (attrTime == null || attrTime.get() == null) {
            return 0;
        }
        return attrTime.get();
    }

    @Override
    public int getSn() {
        return sn.get();
    }

    @Override
    public void setSn(int sn) {
        if (sn > this.sn.get()) {
            this.sn.set(sn);
        }
    }

}
